package com.ouyunc.mq.config.rabbitmq.builder.impl;


import com.ouyunc.mq.config.rabbitmq.builder.RabbitMqBuilder;
import com.ouyunc.mq.config.rabbitmq.strategy.RabbitMqStrategy;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.List;

/**
 * @Author fangzhenxun
 * @Description: 具体rabbitmq建造者
 * @Date 2020/2/28 21:02
 * @Version V1.0
 **/
@Slf4j
@Configuration
public class RabbitTemplateBuilder implements RabbitMqBuilder<RabbitTemplate> {


    /**
     * 获取当前选中的mq使用模式类型，如果没有设置primary则默认为单例模式类型
     **/
    @Value("${mq.rabbit.primary:STANDALONE}")
    private String type;

    /**
     * 获取所有rabbitmq的模式策略
     **/
    @Autowired
    private List<RabbitMqStrategy> rabbitMqStrategyList;


    /**
     * 消息确认回调
     **/
    @Autowired
    private RabbitTemplate.ConfirmCallback confirmCallback;

    /**
     * 消息返回回调
     **/
    @Autowired
    private RabbitTemplate.ReturnsCallback returnCallback;

    /**
     * @Author fangzhenxun
     * @Description  具体mq建造方法
     * @date 2020/2/28 21:03
     * @param
     * @return org.springframework.amqp.rabbit.core.RabbitTemplate
     */
    @Override
    public RabbitTemplate build() {
        RabbitTemplate rabbitTemplate = null;
        RabbitMqStrategy rabbitMqStrategy = currentRabbitMqStrategy();
        CachingConnectionFactory cachingConnectionFactory = rabbitMqStrategy.buildConnectionFactory();
        if (null == rabbitTemplate) {
            //设置工厂
            rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
            //则在发送阻塞之后，消费者的通道仍然是畅通的(使用单独的发送连接，避免生产者由于各种原因阻塞而导致消费者同样阻塞)
            //会多创建一个connection连接以.publisher结尾的连接，那么RabbitTemplate在创建Connection时，会根据这个boolean的值，选择使用ConnectionFactory本身或者ConnectionFactory中的publisherConnectionFactory（也即是一个ConnectionFactory）来创建
            rabbitTemplate.setUsePublisherConnection(true);
            //如果启用 Confirm 机制，设置 ConfirmCallback
            if (cachingConnectionFactory.getPublisherConnectionFactory().isPublisherConfirms()) {
                rabbitTemplate.setConfirmCallback(confirmCallback);
            }
            //如果启用 Return 机制，设置 ReturnCallback，及打开 Mandatory
            if (cachingConnectionFactory.getPublisherConnectionFactory().isPublisherReturns()) {
                rabbitTemplate.setReturnsCallback(returnCallback);
                // 消息发送失败返回到队列中, yml需要配置 publisher-returns: true
                rabbitTemplate.setMandatory(true);
            }
            //调用后初始化方法，没有它将抛出异常
            rabbitTemplate.afterPropertiesSet();
        }
        return rabbitTemplate;
    }



    /**
     * @Author fangzhenxun
     * @Description 当消息发送到交换机（exchange）时，该方法被调用，也就是只确认是否正确到达 Exchange 中
     * ；消息确认到达交换机回调方法（消息从生产者到交换机出现错误会走确认回调方法）
     * 1.如果消息没有到exchange,则 ack=false
     * 2.如果消息到达exchange,则 ack=true
     * @date 2020/3/1 15:21
     * @param
     * @return org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback
     */
    @Bean
    @ConditionalOnMissingBean(value = RabbitTemplate.ConfirmCallback.class)
    public RabbitTemplate.ConfirmCallback confirmCallback() {
        return (correlationData, ack, cause) -> {
            if (!ack) {
                log.error("找不到交换机 ===>消息唯一标识：{}, 确认结果: {}, 失败原因：{}", correlationData, ack, cause);
            }
        };
    }


    /**
     * @Author fangzhenxun
     * @Description 当消息从交换机到队列失败时，该方法被调用。（若成功，则不调用）
     *   需要注意的是：该方法调用后，MsgSendConfirmCallBack中的confirm方法也会被调用，且ack = true
     * @date 2020/3/1 15:22
     * @param
     * @return org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback
     */
    @Bean
    @ConditionalOnMissingBean(value = RabbitTemplate.ReturnsCallback.class)
    public RabbitTemplate.ReturnsCallback returnCallback() {
        return returned -> {
            log.error("找不到路由键 ====>  消息主体 message : {}, 消息code : {}, 描述：{}, 消息使用的交换器 exchange : {}, 消息使用的路由键 routing : {}", returned.getMessage(), returned.getReplyCode(), returned.getReplyText(), returned.getExchange(), returned.getRoutingKey());
        };
    }



    /**
     * @Author fangzhenxun
     * @Description  获取当前选中的rabbitmq
     * @date 2020/2/29 17:53
     * @param
     * @return com.xyt.mq.config.rabbitmq.stratety.RabbitMqStrategy
     */
    private RabbitMqStrategy currentRabbitMqStrategy() {
        if (rabbitMqStrategyList != null && !rabbitMqStrategyList.isEmpty()) {
            return rabbitMqStrategyList.parallelStream().filter(mqStrategy -> {
                String mqModel = mqStrategy.getType().name();
                if (type.equals(mqModel)) {
                    log.info("当前rabbitTemplate加载模式为========》" + mqStrategy.getType().getMqModel());
                }
                return type.equals(mqModel);
            }).findAny().orElseThrow(() ->new RuntimeException("没有找到对应的配置方式"));
        }
        throw new RuntimeException("没有找到对应的配置方式");
    }
}
